################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import filecmp
import os

from pyflink.gen_protos import generate_proto_files
from pyflink.testing.test_case_utils import PyFlinkTestCase


class FlinkFnExecutionTests(PyFlinkTestCase):
    """
    Tests whether flink_fn_exeution_pb2.py and flink_fn_exeution_pb2.pyi is synced with flink-fn-execution.proto.
    """

    flink_fn_execution_pb2_file_name = "flink_fn_execution_pb2.py"
    flink_fn_execution_pb2i_file_name = "flink_fn_execution_pb2.pyi"
    gen_protos_script = "gen_protos.py"
    flink_fn_execution_proto_file_name = "flink-fn-execution.proto"

    def check_file_content(self, file_name):
        expected = os.path.join(self.tempdir, file_name)
        actual = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', file_name)
        self.assertTrue(filecmp.cmp(expected, actual),
                        'File %s should be re-generated by executing %s as %s has changed.'
                        % (file_name,
                           self.gen_protos_script,
                           self.flink_fn_execution_proto_file_name))

    def test_flink_fn_execution_pb2_synced(self):
        generate_proto_files('True', self.tempdir)
        self.check_file_content(self.flink_fn_execution_pb2_file_name)
        self.check_file_content(self.flink_fn_execution_pb2i_file_name)

    def test_state_ttl_config_proto(self):
        from pyflink.datastream.state import StateTtlConfig
        from pyflink.common.time import Time
        state_ttl_config = StateTtlConfig \
            .new_builder(Time.milliseconds(1000)) \
            .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
            .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \
            .cleanup_full_snapshot() \
            .cleanup_incrementally(10, True) \
            .cleanup_in_rocksdb_compact_filter(1000) \
            .build()
        state_ttl_config_proto = state_ttl_config._to_proto()
        state_ttl_config = StateTtlConfig._from_proto(state_ttl_config_proto)
        self.assertEqual(state_ttl_config.get_ttl(), Time.milliseconds(1000))
        self.assertEqual(
            state_ttl_config.get_update_type(), StateTtlConfig.UpdateType.OnCreateAndWrite)
        self.assertEqual(
            state_ttl_config.get_state_visibility(),
            StateTtlConfig.StateVisibility.NeverReturnExpired)
        self.assertEqual(
            state_ttl_config.get_ttl_time_characteristic(),
            StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)

        cleanup_strategies = state_ttl_config.get_cleanup_strategies()
        self.assertTrue(cleanup_strategies.is_cleanup_in_background())
        self.assertTrue(cleanup_strategies.in_full_snapshot())

        incremental_cleanup_strategy = cleanup_strategies.get_incremental_cleanup_strategy()
        self.assertEqual(incremental_cleanup_strategy.get_cleanup_size(), 10)
        self.assertTrue(incremental_cleanup_strategy.run_cleanup_for_every_record())

        rocksdb_compact_filter_cleanup_strategy = \
            cleanup_strategies.get_rocksdb_compact_filter_cleanup_strategy()
        self.assertEqual(
            rocksdb_compact_filter_cleanup_strategy.get_query_time_after_num_entries(), 1000)
